Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle database timeouts in MQTT queue deletion #12317

Merged
merged 3 commits into from
Sep 16, 2024

Conversation

the-mikedavis
Copy link
Member

This fixes some crash reports when using MQTT with Khepri, spotted by @mkuratczyk. With an OMQ stresstest:

omq mqtt --uri mqtt://localhost:1883 --uri mqtt://localhost:1884 --uri mqtt://localhost:1885 -x 10000 -y 10000 -r 1 --publish-to 'sensor/%d' --consume-from '/topic/sensor/%d' --mqtt-consumer-qos 1 --mqtt-publisher-qos 1

while a cluster restarts (make restart-cluster), we would see badmatch errors from matching on {ok, _} for rabbit_queue_type:delete/4 and exits for {normal, {gen_server2, call, [Pid, consumers, infinity]}}. That stress test causes queue churn since QoS1 MQTT creates transient exclusive classic queues. Restarting a node leads to very many queues being deleted which can overload Khepri and lead to timeouts.

The first commit makes a refactor to have rabbit_queue_type:delete/4 return {error, timeout} for timeout errors. {error, timeout} could already be returned and is handled in rabbit_amqqueue:delete_with/4. This change is just for consistency: in some places we returned a protocol_error record instead. The second commit handles the {error, timeout} result in rabbit_mqtt_processor.

Also included is a fix for rabbit_amqqueue:consumers/1 to catch exits: an exit can happen if another process asks for a classic queue's consumers while it is terminating. (With Khepri the terminate callback can take some time as it calls rabbit_amqqueue:internal_delete/2.)

This return value was already possible since a classic queue will return
it during termination if `rabbit_amqqueue:internal_delete/2` fails with
that value.

`rabbit_amqqueue:delete/4` already handles this value and converts it
into a protocol error and channel exit. The other caller (MQTT
processor) will be updated in a child commit.

This commit also replaces eager conversions to protocol errors in
rabbit_classic_queue, rabbit_quorum_queue and rabbit_stream_coordinator:
we should return `{error, timeout}` consistently and not hide it in
protocol errors.
`delegate:invoke/2` catches errors but not exits of the delegate
process. Another process might query for a classic queue's consumers
while the classic queue is being deleted or otherwise terminating and
that would result in an exit of the calling process previously.
@the-mikedavis the-mikedavis self-assigned this Sep 16, 2024
@the-mikedavis the-mikedavis marked this pull request as ready for review September 16, 2024 22:08
@michaelklishin michaelklishin added this to the 4.0.0 milestone Sep 16, 2024
@michaelklishin michaelklishin merged commit 4805e31 into main Sep 16, 2024
281 checks passed
@michaelklishin michaelklishin deleted the md/khepri/mqtt-fixes branch September 16, 2024 23:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants